このドキュメントでは、Spring BatchプロジェクトにおけるJob停止機能のコア実装メカニズムについて詳細に分析します。このプロジェクトは、非同期Job実行と優雅な停止機能を提供する企業レベルのバッチ処理システムです。
@PostMapping("/api/jobs/{executionId}/stop")
public ResponseEntity<Map<String, Object>> stopJob(@PathVariable Long executionId) {
Map<String, Object> response = new HashMap<>();
try {
boolean stopped = jobService.stopJob(executionId);
response.put("success", stopped);
response.put("message", stopped ? "Job停止リクエストが送信されました" : "Job停止に失敗しました");
return ResponseEntity.ok(response);
} catch (Exception e) {
response.put("success", false);
response.put("message", "Job停止中にエラーが発生しました: " + e.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}
主な責任:
public boolean stopJob(Long executionId) {
try {
JobExecution jobExecution = jobExplorer.getJobExecution(executionId);
if (jobExecution != null && jobExecution.isRunning()) {
// Spring Batch標準の停止メカニズム
jobOperator.stop(executionId);
// カスタム停止フラグの設定
jobStopManager.setStopFlag(executionId);
logger.info("Job停止リクエストが送信されました。実行ID: {}", executionId);
return true;
}
return false;
} catch (Exception e) {
logger.error("Job停止中にエラーが発生しました。実行ID: {}", executionId, e);
return false;
}
}
主な責任:
@Component
public class JobStopManager {
private final ConcurrentHashMap<Long, AtomicBoolean> stopFlags = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, Thread> jobThreads = new ConcurrentHashMap<>();
public void setStopFlag(Long executionId) {
stopFlags.put(executionId, new AtomicBoolean(true));
Thread jobThread = jobThreads.get(executionId);
if (jobThread != null && jobThread.isAlive()) {
jobThread.interrupt(); // 強制的にスレッドを中断
}
}
public boolean shouldStop(Long executionId) {
AtomicBoolean stopFlag = stopFlags.get(executionId);
return stopFlag != null && stopFlag.get();
}
public void registerJobThread(Long executionId, Thread thread) {
jobThreads.put(executionId, thread);
}
public void clearStopFlag(Long executionId) {
stopFlags.remove(executionId);
jobThreads.remove(executionId);
}
}
主な責任:
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
Long executionId = chunkContext.getStepContext().getStepExecution().getJobExecutionId();
// 現在のスレッドを登録
jobStopManager.registerJobThread(executionId, Thread.currentThread());
try {
// ビジネス処理ループ
while (!jobStopManager.shouldStop(executionId)) {
// 具体的なビジネスロジック
processBusinessLogic();
// スレッド中断チェック
if (Thread.currentThread().isInterrupted()) {
logger.info("Job実行が中断されました。実行ID: {}", executionId);
break;
}
}
return RepeatStatus.FINISHED;
} finally {
jobStopManager.clearStopFlag(executionId);
}
}
主な責任:
JobOperator.stop()を使用してSpring Batchフレームワークレベルでの停止JobStopManagerを通じてアプリケーションレベルでの停止制御ConcurrentHashMapを使用して停止フラグを保存AtomicBooleanで操作の原子性を保証Thread.interrupt()による強制スレッド中断停止リクエスト: Webクライアントが/api/jobs/{executionId}/stopエンドポイントにPOSTリクエストを送信
リクエスト処理: JobControllerがリクエストを受信し、JobService.stopJob()を呼び出し
状態検証: JobServiceがJobExplorerを通じてJob実行状態を確認
二重停止実行:
JobOperator.stop()でSpring Batch標準停止JobStopManager.setStopFlag()でカスタム停止フラグ設定スレッド中断: JobStopManagerが対応するJobスレッドを中断
停止検出: AsyncBusinessJobTaskletが停止フラグをチェックし、ビジネス処理を停止
リソースクリーンアップ: finallyブロックで停止フラグとスレッド参照をクリア
// ビジネス処理ループ内で定期的にチェック
while (!jobStopManager.shouldStop(executionId)) {
// ビジネスロジック
if (processedCount % 100 == 0) {
// 100件処理ごとに停止フラグをチェック
if (jobStopManager.shouldStop(executionId)) {
break;
}
}
}
try {
// ビジネス処理
} finally {
// 必ずリソースをクリーンアップ
jobStopManager.clearStopFlag(executionId);
// その他のリソース解放
}
logger.info("Job停止リクエストを受信しました。実行ID: {}", executionId);
logger.info("Job停止が完了しました。実行ID: {}", executionId);
この設計は企業レベルアプリケーションの高い基準を体現し、多層停止戦略を通じてシステムの信頼性と応答性を確保しています。